/* SPDX-License-Identifier: Apache-2.0 OR MIT
 *
 * SPDX-FileCopyrightText: Copyright 2015 Micron Technology, Inc.
 */

#include <ctype.h>
#include <errno.h>
#include <getopt.h>
#include <math.h>
#include <pthread.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sysexits.h>
#include <unistd.h>

#include <sys/time.h>

#include <hse/hse.h>
#include <hse/version.h>

#include <hse/cli/program.h>
#include <hse/util/atomic.h>
#include <hse/util/parse_num.h>

#include <hse/tools/parm_groups.h>

#include "rsgen.h"

/*
 * Test Goals
 * ----------
 *
 * The primary goal of longtest is to exercise a KVS for a long duration (day,
 * weeks, months?) in such a way that the KVS state can be precisely verified
 * during the test as well as at the end of the test.
 *
 * Secondary goals are:
 *   - Mix data with different lifecycles.
 *   - Repeatable at the thread level (i.e., a seed determines each thread's
 *     sequence of operations, but cannot control how they are interleaved).
 *   - Configurable with respect to:
 *       + key/value characteristics (binary/ascii, min/max len)
 *       + printf-like specification of keys and values
 *       + distribution of keys among threads
 *
 * Overview
 * --------
 *
 * The test uses multiple threads, each configured with different number of
 * keys.  Each thread runs through the following steps:
 *
 *     while ( not time to stop ) {
 *       insert all N keys (N is different for each thread);
 *       update some keys;
 *       delete some keys;
 *       verify some keys;
 *       delete remaining keys;
 *     }
 *
 * Typically N varies significantly across threads.  The idea is to mix
 * key/value pairs with differnent life expectencies in the same test run.
 * For example, thread 0 (which always has the largest N) might have millions
 * of keys so that each cycle takes hours, while thread 15 has 1000 keys so
 * that all of its keys are short lived.
 *
 *
 * Simplified Details
 * ------------------
 *
 * There are P threads: T[0], T[1], ... T[P-1].  Each thread 'i' has
 * 'T[i]->num_keys' uniqiue keys and values (i.e., T[0]'s keys do not overlap
 * with T[1]'s keys).  Each thread 'i' runs through 'T[i]->num_iters'
 * iterations.  The logic for a single iteration is:
 *
 *   Setup:
 *     - Let N = T[i]->num_keys.
 *     - Let KEYS be the set of N keys generated by T[i]
 *     - Let KVS be the set of keys in the kvs.
 *     - Assert( KEYS \intersection KVS == \emptyset )
 *
 *   Load:
 *     - Insert keys 0 to N-1 with "initial" values.
 *     - Assert( KEYS \subset KVS );
 *     - Note: assertion uses \subset b/c threads are working on same KVS
 *       (albeit with their own set of unique keys).
 *
 *   Update some of the initial keys with updated values:
 *     - Insert keys 0 to N/2-1 with "updated" values.
 *
 *   Delete some of the updated keys:
 *     - Delete keys 0 to N/4-1.
 *
 *   Delete some keys that were not updated:
 *     - Delete keys 3*N/4 to N-1
 *
 *   Verify:
 *     - verify the 1st 25% of keys are not in KVS
 *     - verify the 2nd 25% of keys are in KVS with updated values
 *     - verify the 3rd 25% of keys are in KVS with initial values
 *     - verify the 4rd 25% of keys are not in KVS
 *
 *   Cleanup:
 *     - delete the 2nd and 3rd quartiles to prep for next iteration
 *
 * The keys in each quartile are given names based on their lifecycle:
 *
 *     Quartile    Group Name    LifeCycle
 *     --------    ----------    -------------------
 *     1st 25%     put_up_del    PUD: put, update, delete
 *     2nd 25%     put_up        PU:  put, update
 *     3rd 25%     put           P:   put
 *     4th 25%     put_del       PD:  put, delete
 *
 *
 * The above description has been simplified to facilitate understanding.  The
 * actual implementation differs in the following ways:
 *   - The 4 groups (put_up_del, etc) are not equal size.  Typically the "put"
 *     group is 85% of N, and the other 3 groups are 5% each.
 *   - Only a small percentage of entries in each group are verified.
 *
 * Todo List
 * ---------
 *   - During the load step keys are insert in order of keynum.  Subsequent
 *     udpates, deletes and verifies are also done in order of keynum.  It
 *     would be more interesting if the subsequent steps where executed in
 *     pseudo-random keynum order.  The fact that mutliple threads have
 *     different numbers of keys makes this behavior less onerous.
 */

#define MAX_KMAX (2 * 1024)
#define MAX_VMAX RS_MAX_VALUE_LEN

/* Long test has three types of threads:
 * - The main thread the spawns other threads, prints periodic stats, and
 *   reaps other threads.
 * - Test threads that put, update, delete and verify keys into the KVDB.
 * - Auxiliary threads that do other things (e.g., periodcically insert prefix
 *   tombstones in order to bump up the sequence number).
 */
#define MAX_TEST_THREADS 128
#define MAX_AUX_THREADS  1

#define MAX_THREADS (MAX_TEST_THREADS + MAX_AUX_THREADS)

/* == Section: Command Line Processing ================ */

static void
syntax(const char *fmt, ...);
static void
usage(void);

enum opt_enum {
    opt_config = 'Z',
    opt_keys = 'c',
    opt_help = 'h',
    opt_num_iters = 'i',
    opt_dryrun = 'n',
    opt_duration = 's',
    opt_threads = 't',
    opt_version = 'V',
    opt_verbose = 'v',
    opt_errcnt = 'e',
    opt_verify = 'p',
    opt_log_stdout = 'l',
    opt_cursor = 'C',
    opt_sync = 'S',

    opt_seed = 1024,
    opt_once,
    opt_show,
    opt_nostats,
    opt_interactive,

    opt_exp,
    opt_poly,

    opt_mthread,
    opt_mphase,

    opt_seqnum_pfx,

    opt_klen,
    opt_kmin,
    opt_kmax,
    opt_vlen,
    opt_vmin,
    opt_vmax,
};

enum phase {
    PHASE_PUT_P = 1 << 0,
    PHASE_PUT_PU = 1 << 1,
    PHASE_DEL_P = 1 << 2,
    PHASE_DEL_PU = 1 << 3,
    PHASE_VER_P = 1 << 4,
    PHASE_VER_PU = 1 << 5,
    PHASE_VER_PD = 1 << 6,
    PHASE_VER_PUD = 1 << 7,
    PHASE_DEL_REM = 1 << 8,
    PHASE_VER_REM = 1 << 9,

    PHASE_ALL = -1,
    PHASE_PUD_MASK = 15,
};

struct opts {
    const char *config;
    const char *mpool;
    const char *kvs;

    uint64_t keys;
    uint64_t duration;
    uint64_t num_iters;

    uint32_t kmin, kmax, vmin, vmax;

    uint32_t verify;
    uint32_t seed;
    uint32_t test_threads;

    uint32_t verbose;
    uint32_t sync;
    int mthread;
    enum phase mphase;
    bool cursor;
    bool distr_is_exp;
    double distr_param;

    int32_t max_errors;
    uint32_t seqnum_pfx;

    bool version;
    bool help;
    bool dryrun;
    bool stats;
    bool once;
    bool show;
    bool log_stdout;
    bool interactive;
};

struct parm_groups *pg;
struct svec hse_gparm = { 0 };
struct svec db_oparms = { 0 };
struct svec kv_oparms = { 0 };

/* per-thread state */
struct tstate {

    /* constant for duration of test */
    uint32_t id;
    uint64_t key_space_offset;
    uint64_t num_keys;
    uint64_t num_iters;

    /* if using cursors, there is a prefix of the thread id */
    char pfxbuf[2];
    void *pfx;
    size_t pfxlen;

    /* current iteration */
    uint64_t iter_cnt;

    /* stats */
    uint64_t put_cnt;
    uint64_t upd_cnt;
    uint64_t get_cnt;
    uint64_t del_cnt;

    /* thread management */
    pthread_t thread;
    bool running;
    bool joined;

    /* work buffers */
    char genkey[MAX_VMAX];  /* generated key */
    char exp_val[MAX_VMAX]; /* expected value */
    char act_val[MAX_VMAX]; /* actual retrieved value */
    char msgbuf[MAX_VMAX * 2 + 256];
    char msgbuf1[MAX_VMAX * 2 + 256];
};

struct test_stats {
    uint64_t puts;
    uint64_t upds;
    uint64_t gets;
    uint64_t dels;
    uint64_t iters;
    uint64_t ops;
};

struct test {
    void *kvdb_h;
    void *kvs_h;
    struct rsgen kgen;
    struct rsgen vgen;
    uint64_t start_time;
    uint32_t stat_rows_no_hdr;
    uint32_t running_threads;
    pthread_barrier_t bar_sync;
    atomic_int errors;
    atomic_int test_complete;
    atomic_int hard_stop;
    uint8_t *seqnum_pfx_key;
    struct test_stats tot;
    struct test_stats stats[MAX_THREADS];
    struct tstate ts[MAX_THREADS];
};

struct keystat {
    int ks_puts;
    int ks_dels;
    int ks_upds;
};

struct keystat *keystats = NULL;

struct opts opt;
struct test test;

struct option longopts[] = { { "config", required_argument, NULL, opt_config },
                             { "dryrun", no_argument, NULL, opt_dryrun },
                             { "duration", required_argument, NULL, opt_duration },
                             { "exp", required_argument, NULL, opt_exp },
                             { "help", no_argument, NULL, opt_help },
                             { "num_iters", required_argument, NULL, opt_num_iters },
                             { "keys", required_argument, NULL, opt_keys },

                             { "klen", required_argument, NULL, opt_klen },
                             { "kmin", required_argument, NULL, opt_kmin },
                             { "kmax", required_argument, NULL, opt_kmax },

                             { "vlen", required_argument, NULL, opt_vlen },
                             { "vmin", required_argument, NULL, opt_vmin },
                             { "vmax", required_argument, NULL, opt_vmax },

                             { "poly", required_argument, NULL, opt_poly },
                             { "seed", required_argument, NULL, opt_seed },
                             { "once", no_argument, NULL, opt_once },
                             { "threads", required_argument, NULL, opt_threads },
                             { "verbose", optional_argument, NULL, opt_verbose },
                             { "show", no_argument, NULL, opt_show },
                             { "logs", no_argument, NULL, opt_log_stdout },
                             { "version", no_argument, NULL, opt_version },
                             { "errcnt", required_argument, NULL, opt_errcnt },
                             { "verify", required_argument, NULL, opt_verify },
                             { "cursor", no_argument, NULL, opt_cursor },
                             { "sync", optional_argument, NULL, opt_sync },
                             { "nostats", no_argument, NULL, opt_nostats },
                             { "interactive", no_argument, NULL, opt_interactive },

                             { "mthread", required_argument, NULL, opt_mthread },
                             { "mphase", required_argument, NULL, opt_mphase },

                             { "seqnum-pfx", required_argument, NULL, opt_seqnum_pfx },

                             { 0, 0, 0, 0 } };

static void
opts_set_default(struct opts *opt)
{
    memset(opt, 0, sizeof(*opt));

    opt->keys = 32 * 1000 * 1000;
    opt->test_threads = 32;
    opt->seed = 0;

    opt->distr_is_exp = true;
    opt->distr_param = 1.0;

    opt->kmin = 20;
    opt->kmax = 30;
    opt->vmin = 80;
    opt->vmax = 120;

    opt->cursor = 0;
    opt->verify = 25;
    opt->verbose = 0;
    opt->log_stdout = false;
    opt->dryrun = false;

    opt->mthread = -1;
    opt->mphase = PHASE_ALL;

    opt->mpool = NULL;

    opt->duration = 0;
    opt->num_iters = 0;
    opt->max_errors = 1;

    opt->stats = 1;
}

void
opts_show(struct opts *opt)
{
    printf("  mpool          =  %s\n", opt->mpool ?: "<none>");
    printf("  kvs            =  %s\n", opt->kvs ?: "<none>");
    printf("  num keys       =  %lu\n", opt->keys);
    printf("  duration       =  %lu\n", opt->duration);
    printf("  iterations     =  %lu\n", opt->num_iters);
    printf("  min key len    =  %u\n", opt->kmin);
    printf("  max key len    =  %u\n", opt->kmax);
    printf("  min value len  =  %u\n", opt->vmin);
    printf("  max value len  =  %u\n", opt->vmax);
    printf("  seed           =  %u\n", opt->seed);
    printf(
        "  distr          =  %s:%g\n", (opt->distr_is_exp ? "exponential" : "polynomial"),
        opt->distr_param);
    printf("  threads        =  %u\n", opt->test_threads);
    printf("  verbose        =  %d\n", opt->verbose);
    printf("  logs           =  %d\n", opt->log_stdout);
    printf("  dryrun         =  %d\n", opt->dryrun);
    printf("  errcnt         =  %u\n", opt->max_errors);
    printf("  verify         =  %u\n", opt->verify);
    printf("  mphase         =  0x%x\n", opt->mphase);
    printf("  mthread        =  %d\n", opt->mthread);
    printf("  cursor         =  %d\n", opt->cursor);
    printf("  sync           =  %d\n", opt->sync);
    printf("  stats          =  %d\n", opt->stats);
}

#define GET_VALUE(TYPE, OPTARG, VALUE)        \
    do {                                      \
        if (parse_##TYPE(OPTARG, VALUE)) {    \
            syntax(                           \
                (char *)"Unable to parse"     \
                        " " #TYPE " number: " \
                        "'%s'",               \
                OPTARG);                      \
        }                                     \
    } while (0)

#define GET_DOUBLE(OPTARG, VALUE)                  \
    do {                                           \
        if (1 != sscanf(OPTARG, "%lg", VALUE)) {   \
            syntax(                                \
                (char *)"Unable to parse"          \
                        " floating point number: " \
                        "'%s'",                    \
                OPTARG);                           \
        }                                          \
    } while (0)

void
opts_parse(int argc, char **argv, struct opts *opt)
{
    int done;

    /* Dynamically build option string from longopts[] for getopt_long(). */
    const size_t opstrsz = (sizeof(longopts) / sizeof(longopts[0])) * 3 + 3;
    char opstr[opstrsz + 1];
    const struct option *longopt;
    char *pc = opstr;

    *pc++ = ':'; /* Disable getopt error messages */

    for (longopt = longopts; longopt->name; ++longopt) {
        if (!longopt->flag && isprint(longopt->val)) {
            *pc++ = longopt->val;
            if (longopt->has_arg == required_argument) {
                *pc++ = ':';
            } else if (longopt->has_arg == optional_argument) {
                *pc++ = ':';
                *pc++ = ':';
            }
        }
    }
    *pc = '\000';

    done = 0;
    while (!done) {
        int curind = optind;
        int longidx = 0;
        int c;

        c = getopt_long(argc, argv, opstr, longopts, &longidx);
        if (-1 == c)
            break; /* got '--' or end of arg list */
        switch (c) {

        case opt_verbose:
            if (optarg)
                GET_VALUE(u32, optarg, &opt->verbose);
            else
                ++opt->verbose;
            break;

        case opt_sync:
            if (optarg)
                GET_VALUE(u32, optarg, &opt->sync);
            else
                opt->sync = -1;
            break;

        case opt_help:
            opt->help = true;
            break;
        case opt_config:
            opt->config = optarg;
            break;
        case opt_dryrun:
            opt->dryrun = true;
            break;
        case opt_once:
            opt->once = true;
            break;
        case opt_show:
            opt->show = true;
            break;
        case opt_nostats:
            opt->stats = false;
            break;
        case opt_interactive:
            opt->interactive = true;
            break;
        case opt_log_stdout:
            opt->log_stdout = true;
            break;
        case opt_version:
            opt->version = true;
            break;

        case opt_kmin:
            GET_VALUE(u32, optarg, &opt->kmin);
            break;
        case opt_kmax:
            GET_VALUE(u32, optarg, &opt->kmax);
            break;
        case opt_klen:
            GET_VALUE(u32, optarg, &opt->kmax);
            opt->kmin = opt->kmax;
            break;

        case opt_vmin:
            GET_VALUE(u32, optarg, &opt->vmin);
            break;
        case opt_vmax:
            GET_VALUE(u32, optarg, &opt->vmax);
            break;
        case opt_vlen:
            GET_VALUE(u32, optarg, &opt->vmax);
            opt->vmin = opt->vmax;
            break;

        case opt_threads:
            GET_VALUE(u32, optarg, &opt->test_threads);
            break;

        case opt_keys:
            GET_VALUE(u64, optarg, &opt->keys);
            break;
        case opt_seed:
            GET_VALUE(u32, optarg, &opt->seed);
            break;

        case opt_mphase:
            GET_VALUE(s32, optarg, &opt->mphase);
            break;
        case opt_mthread:
            GET_VALUE(s32, optarg, &opt->mthread);
            break;

        case opt_seqnum_pfx:
            GET_VALUE(u32, optarg, &opt->seqnum_pfx);
            break;

        case opt_errcnt:
            GET_VALUE(s32, optarg, &opt->max_errors);
            break;

        case opt_verify:
            GET_VALUE(u32, optarg, &opt->verify);
            break;

        case opt_cursor:
            opt->cursor = true;
            break;

        case opt_exp:
            opt->distr_is_exp = true;
            GET_DOUBLE(optarg, &opt->distr_param);
            break;

        case opt_poly:
            opt->distr_is_exp = false;
            GET_DOUBLE(optarg, &opt->distr_param);
            break;

        case opt_duration:
            GET_VALUE(u64, optarg, &opt->duration);
            break;

        case opt_num_iters:
            GET_VALUE(u64, optarg, &opt->num_iters);
            break;

        case ':':
            syntax("missing argument for option '%s'", argv[curind]);
            break;

        case '?':
            if (!strcmp(argv[optind - 1], "-m") || !strcmp(argv[optind - 1], "-k")) {
                /* Accept for backward compatibility. */
                break;
            }
            /* If the uknown arg starts with '-', then error.
             * Otherwise assume it is a positional paramater that
             * will be handled by the caller. */
            if (*argv[optind - 1] == '-')
                syntax("invalid option '%s'", argv[optind - 1]);
            break;

        default:
            if (c == 0) {
                if (!longopt[longidx].flag) {
                    syntax("unhandled option '--%s'", longopts[longidx].name);
                }
            } else {
                syntax("unhandled option '%s'", argv[curind]);
            }
            break;
        }
    }

    if (opt->help)
        usage();
    else if (opt->version) {
        printf("HSE KVDB Lib:   %s\n", HSE_VERSION_STRING);
    }
}

static void
usage(void)
{
    printf("usage: %s [options] <kvdb> <kvs> [param=value ...]", progname);
    printf(
        "%s\n",
        "  -h, --help             show help (use -hv for more help)\n"
        "  -t, --threads THREADS  specify the number of threads\n"
        "  -c, --keys COUNT       put/get COUNT keys\n"
        "  -i, --iterations ITER  run for given number of iterations\n"
        "  -s, --duration SECS    specify the test duration in seconds\n"
        "  -n, --dryrun           show operations w/o touching kvs\n"
        "  -V, --version          print build version\n"
        "  -v, --verbose[=LVL]    increase[or set] verbosity (0=quiet)\n"
        "  -e, --errcnt N         stop after N errors, 0=infinite\n"
        "  -l, --logs             mirror logs stdout\n"
        "  -C, --cursor           use a cursor to verify\n"
        "  -S, --sync[=THREAD]    given thread calls sync once/iter\n"
        "                         use -S or -sync=-1 for all threads)\n"
        "      --verify[=PCT]     set verify percentage\n"
        "      --once             run each thread through 1 iteration\n"
        "      --exp M            M > 0.0\n"
        "      --poly DEGREE      DEGREE >= 0.0\n"
        "      --klen LEN         fixed key length\n"
        "      --kmin LEN         min key length\n"
        "      --kmax LEN         max key length\n"
        "      --vlen LEN         fixed value length\n"
        "      --vmin LEN         min value length\n"
        "      --vmax LEN         max value length\n"
        "      --seed SEED\n"
        "      --show             show test parameters and exit\n"
        "      --nostats          do not show periodic stats\n"
        "      --interactive      run interactively (use with -t 1)\n"
        "      --seqnum-pfx LEN   issue frequent pfx delete operations\n"
        "                         for the side-effect of increasing\n"
        "                         KVDB sequnce number\n");
    if (!opt.verbose)
        return;

    printf(
        "%s\n",
        "Set #threads, #keys, etc:\n"
        "    --threads THREADS    // set number of threads\n"
        "    --keys KEYS          // number of keys\n"
        "\n"
        "Set how to distribute keys among threads:\n"
        "    --exp M              // M > 0.0\n"
        "    --poly DEGREE        // DEGREE >= 0.0\n"
        "\n"
        "Define how long to run the test:\n"
        "    If ITER is zero or not set, then run test for DURATION\n"
        "    seconds, otherwise stop after thread 0 (which always\n"
        "    has the most keys) has completed ITER iterations.\n"
        "      --duration DURATION\n"
        "      --iterations ITER\n"
        "\n"
        "Key and value sizes:\n"
        "    These settings define the amount of random data used in\n"
        "    keys and values.  The actual sizes will be a bit\n"
        "    larger due to embedded 'magic' data used by the test.\n"
        "      --kmin LEN, --kmax LEN // min/max key length\n"
        "      --vmin LEN, --vmax LEN // min/max value length\n"
        "\n"
        "PRNG seed:\n"
        "    --seed SEED\n"
        "\n"
        "Phases (for --mphase):\n"
        "    PUT_P    = 1 << 0\n"
        "    PUT_PU   = 1 << 1\n"
        "    DEL_P    = 1 << 2\n"
        "    DEL_PU   = 1 << 3\n"
        "    VER_P    = 1 << 4\n"
        "    VER_PU   = 1 << 5\n"
        "    VER_PUD  = 1 << 6\n"
        "    VER_PD   = 1 << 7\n"
        "    DEL_REM  = 1 << 8\n"
        "    VER_VREM = 1 << 9\n"
        "\n");
}

static void
syntax(const char *fmt, ...)
{
    char msg[256];
    va_list ap;

    va_start(ap, fmt);
    vsnprintf(msg, sizeof(msg), fmt, ap);
    va_end(ap);

    fprintf(stderr, "%s: %s, use -h for help\n", progname, msg);
    exit(EX_USAGE);
}

/* get time of day as a count of micro seconds */
uint64_t
gtod_usec(void)
{
    struct timeval ctime;

    gettimeofday(&ctime, 0);
    return (uint64_t)ctime.tv_sec * (uint64_t)1000000 + (uint64_t)ctime.tv_usec;
}

/**
 * Returns an array of values, @norm, such that:
 *
 *   y[i] = ((i+1)/len) ** degree;
 *   total = y[0] + y[1] + ... y[len-1];
 *   norm = y[i] / total;
 *
 * Require that @degree >= 0.
 * - If @degree==0, the distribution is level.
 * - If @degree==1, the distribution is linear.
 * - If @degree==2, the distribution is quadratic.
 * - Typical range: 0..3.
 * - Larger values might make sense for can be used for 1M+ keys.
 */
int
poly_dist(double degree, uint32_t len, double *results)
{
    uint32_t i;
    double total;

    if (degree < 0.0) {
        fprintf(stderr, "Error: invalid --poly value: %g (must be >= 0.0)\n", degree);
        return -1;
    }

    total = 0.0;
    for (i = 0; i < len; i++) {
        results[i] = pow((double)(i + 1) / (double)len, degree);
        total += results[i];
    }

    /* normalized */
    for (i = 0; i < len; i++)
        results[i] = results[i] / total;

    return 0;
}

/**
 * Returns an array of values such that:
 *
 *   y[i] = (e ** (M*(i+1)/len)) - 1
 *   total = y[0] + y[1] + ... y[len-1];
 *   norm = y[i] / total;
 *
 * Require that @degree >= 0.
 * - If @degree==0, the distribution is level.
 * - If @degree==1, the distribution is linear.
 * - If @degree==2, the distribution is quadratic.
 * - Typical range: 0..3.
 * - Larger values might make sense for can be used for 1M+ keys.
 */
int
exp_dist(double m, uint32_t len, double *dist)
{
    uint32_t i;
    double total;

    if (m <= 0.0) {
        fprintf(stderr, "Error: invalid --exp value: %g (must be > 0.0)\n", m);
        return -1;
    }

    total = 0.0;
    for (i = 0; i < len; i++) {
        dist[i] = exp((m * (i + 1)) / (double)len) - 1.0;
        total += dist[i];
    }

    /* normalize */
    for (i = 0; i < len; i++)
        dist[i] = dist[i] / total;

    return 0;
}

int
normalize_dist(double *distr, uint32_t len, uint64_t scale, uint64_t *results)
{
    uint64_t tot = 0;
    int i;

    for (i = 0; i < len; i++) {
        double val = round(distr[i] * scale);

        if (val > (double)(UINT64_MAX)) {
            fprintf(
                stderr,
                "Error: overflow: "
                "try a more uniform distribution\n");
            return -1;
        }
        results[i] = (uint64_t)val;
        tot += results[i];
    }

    if (tot != scale) {
        double adjust = (double)scale - (double)tot;
        double val = (double)results[len - 1] + adjust;

        if (val > (double)(UINT64_MAX)) {
            fprintf(
                stderr,
                "Error: overflow: "
                "try a more uniform distribution\n");
            return -1;
        }
        results[len - 1] = (uint64_t)val;
    }

    return 0;
}

uint64_t
xkvdb_kvs_open(struct test *t, unsigned int *idxv, const char *mp, const char *kvs)
{
    struct hse_kvdb *kvdb_h;
    struct hse_kvs *kvs_h;
    uint64_t err;

    if (opt.dryrun)
        return 0;

    err = hse_kvdb_open(mp, db_oparms.strc, db_oparms.strv, &kvdb_h);
    if (err) {
        fprintf(stderr, "%s: Unable to open kvdb %s", progname, mp);
        exit(1);
    }

    err = hse_kvdb_kvs_open(kvdb_h, kvs, kv_oparms.strc, kv_oparms.strv, &kvs_h);
    if (err) {
        fprintf(stderr, "%s: Unable to open kvs %s", progname, mp);
        exit(1);
    }

    t->kvdb_h = kvdb_h;
    t->kvs_h = kvs_h;

    return err;
}

uint64_t
xkvdb_kvs_close(struct test *t)
{
    return opt.dryrun ? 0 : hse_kvdb_close(t->kvdb_h);
}

uint64_t
xkvs_put(struct test *t, void *key, size_t klen, void *val, size_t vlen)
{
    if (opt.dryrun)
        return 0UL;

    return hse_kvs_put(t->kvs_h, 0, NULL, key, klen, val, vlen);
}

uint64_t
xkvs_get(struct test *t, void *key, size_t klen, bool *found, void *val, size_t *vlen)
{
    *found = false;

    if (opt.dryrun)
        return 0UL;

    *vlen = MAX_VMAX; /* assign here so valgrind knows it's initialized */
    return hse_kvs_get(t->kvs_h, 0, NULL, key, klen, found, val, *vlen, vlen);
}

uint64_t
xkvs_del(struct test *t, void *key, size_t klen)
{
    return opt.dryrun ? 0UL : hse_kvs_delete(t->kvs_h, 0, NULL, key, klen);
}

uint64_t
xkvs_prefix_delete(struct test *t, void *key, size_t klen)
{
    return opt.dryrun ? 0UL : hse_kvs_prefix_delete(t->kvs_h, 0, NULL, key, klen);
}

uint64_t
xkvdb_scan_begin(struct test *t, void *pfx, size_t plen, void **cur)
{
    void *pfxkey = pfx ?: 0;
    int pfxlen = pfx ? plen : 0;

    if (opt.dryrun)
        return 0UL;
    else
        return hse_kvs_cursor_create(
            t->kvs_h, 0, NULL, pfxkey, pfxlen, (struct hse_kvs_cursor **)cur);
}

uint64_t
xkvdb_scan_read(void *cur, void **key, size_t *klen, void **val, size_t *vlen)
{
    uint64_t err;
    bool eof;

    if (opt.dryrun)
        return 0;

    err = hse_kvs_cursor_read(cur, 0, (const void **)key, klen, (const void **)val, vlen, &eof);
    if (err)
        return err;

    if (eof)
        *klen = 0;

    return err;
}

uint64_t
xkvdb_scan_end(void *cur)
{
    return opt.dryrun ? 0UL : hse_kvs_cursor_destroy(cur);
}

void
bar_sync(void)
{
    int rc;

    rc = pthread_barrier_wait(&test.bar_sync);
    if (rc && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
        perror("Error: pthread_barrier_wait");
        exit(-1);
    }
}

char *
hexstr(char *data, size_t data_len, char *out, size_t out_len)
{
    if (data_len * 2 + 1 < out_len) {
        const char nybble2xdigit[] = "0123456789abcdef";
        size_t i;

        for (i = 0; i < data_len; i++) {
            out[i * 2] = nybble2xdigit[((uint8_t)data[i] >> 4)];
            out[i * 2 + 1] = nybble2xdigit[((uint8_t)data[i] & 0xfu)];
        }
        out[data_len * 2] = '\000';
    } else {
        snprintf(out, out_len, "****");
    }

    return out;
}

void
hard_stop(void)
{
    atomic_set(&test.hard_stop, 1);
}

void *
test_main(void *rock);
void *
seqnum_pfx_thread(void *rock);

void
start_threads(void)
{
    struct tstate *ts = test.ts;

    int rc;
    int i;

    for (i = 0; i < opt.test_threads; i++) {

        ts->running = true;
        rc = pthread_create(&ts->thread, NULL, test_main, ts);
        if (rc) {
            perror("pthread_create");
            exit(1);
        }
        ts++;
    }

    if (opt.seqnum_pfx) {

        size_t pfx_key_len = opt.seqnum_pfx;

        test.seqnum_pfx_key = malloc(pfx_key_len);
        if (!test.seqnum_pfx_key)
            exit(1);

        memset(test.seqnum_pfx_key, 'X', pfx_key_len);
        test.seqnum_pfx_key[pfx_key_len - 1] = 0;

        ts->running = true;
        rc = pthread_create(&ts->thread, NULL, seqnum_pfx_thread, ts);
        if (rc) {
            perror("pthread_create");
            exit(1);
        }
        ts++;
    }
}

void
join_threads(void)
{
    uint64_t last_report = gtod_usec();
    unsigned joined = 0;
    uint64_t now;
    int i;

    while (joined != test.running_threads) {

        for (i = 0; i < test.running_threads; i++) {
            struct tstate *ts = test.ts + i;

            if (!ts->joined) {
                if (ts->running) {
                    void *retval;
                    long one_sec = 1000 * 1000 * 1000;
                    long tenth_sec = one_sec / 10;
                    struct timespec time;

                    clock_gettime(CLOCK_REALTIME, &time);
                    time.tv_nsec += tenth_sec;
                    while (time.tv_nsec > one_sec) {
                        time.tv_nsec -= one_sec;
                        time.tv_sec += 1;
                    }
                    if (0 == pthread_timedjoin_np(ts->thread, &retval, &time)) {
                        ts->joined = true;
                        joined++;
                    }
                } else {
                    /* this thread never even ran */
                    ts->joined = true;
                    joined++;
                }
            }
        }

        now = gtod_usec();
        if (now - last_report > 1000000) {
            fprintf(
                stderr,
                "Waiting for threads:"
                " %u of %u threads have finished\n",
                joined, test.running_threads);
            last_report = now;
        }
        usleep(1000);
    }
}

void
interact(const char *action)
{
    char input[64];

    if (!opt.interactive)
        return;

    if (!action)
        printf("Press CR to continue:");
    else
        printf("Press CR to continue with %s:", action);

    fflush(stdout);
    while (true) {
        uint len;

        if (!fgets(input, sizeof(input), stdin))
            break;
        len = strlen(input);
        if (len == 0 || input[len - 1] == '\n')
            break;
    }
}

void
do_puts(struct tstate *ts, uint64_t first_key, uint64_t num_keys, char *key_range_name, char tag)
{
    uint64_t keynum;
    uint32_t vlen;
    uint32_t klen;
    uint64_t rc;
    int i;

    if (opt.verbose > 2 || opt.interactive)
        printf("T%03u I%lu: insert(%s) %lu keys\n", ts->id, ts->iter_cnt, key_range_name, num_keys);
    interact(0);

    for (i = 0; i < num_keys; i++) {

        if (atomic_read(&test.hard_stop))
            break;

        keynum = i + ts->key_space_offset + first_key;

        rsgen_str(&test.kgen, ts->id, keynum, ts->iter_cnt, 0, ts->genkey, &klen);

        if (opt.vmax)
            rsgen_str(&test.vgen, ts->id, keynum, ts->iter_cnt, tag, ts->exp_val, &vlen);
        else
            vlen = 0;

        if (opt.verbose > 3)
            printf(
                "T%03d I%lu: hse_kvs_put(%s) "
                "key#%lu len=%u key=%s%s%s\n",
                ts->id, ts->iter_cnt, key_range_name, keynum, klen,
                hexstr(ts->genkey, klen, ts->msgbuf, sizeof(ts->msgbuf)),
                (opt.verbose <= 4 ? "" : " val="),
                (opt.verbose <= 4 ? ""
                                  : hexstr(ts->exp_val, vlen, ts->msgbuf1, sizeof(ts->msgbuf1))));

        rc = xkvs_put(&test, ts->genkey, klen, ts->exp_val, vlen);
        if (rc) {
            fprintf(
                stderr,
                "%s: T%03d I%lu: %s %s %lu: "
                "hse_kvs_put failed\n",
                progname, ts->id, ts->iter_cnt, "put", key_range_name, keynum);
            return;
        }

        if (tag == 'P') {
            if (keystats)
                keystats[keynum].ks_puts++;
            ts->put_cnt += 1;
        } else {
            ts->upd_cnt += 1;
            if (keystats)
                keystats[keynum].ks_upds++;
        }
    }
}

void
do_deletes(struct tstate *ts, uint64_t first_key, uint64_t num_keys, char *key_range_name)
{
    uint64_t i;
    uint64_t keynum;
    uint32_t klen;
    uint64_t rc;

    if (opt.verbose > 2 || opt.interactive)
        printf("T%03u I%lu: delete(%s) %lu keys\n", ts->id, ts->iter_cnt, key_range_name, num_keys);
    interact(0);

    for (i = 0; i < num_keys; i++) {
        if (atomic_read(&test.hard_stop))
            return;

        keynum = i + ts->key_space_offset + first_key;

        rsgen_str(&test.kgen, ts->id, keynum, ts->iter_cnt, 0, ts->genkey, &klen);

        if (opt.verbose > 3)
            printf(
                "T%03d I%lu: kvs_del(%s) "
                "key#%lu len=%u key=%s\n",
                ts->id, ts->iter_cnt, key_range_name, keynum, klen,
                hexstr(ts->genkey, klen, ts->msgbuf, sizeof(ts->msgbuf)));

        rc = xkvs_del(&test, ts->genkey, klen);
        if (rc) {
            fprintf(
                stderr,
                "%s: T%03d I%lu: delete %s %lu: "
                "kvs_del failed\n",
                progname, ts->id, ts->iter_cnt, key_range_name, keynum);
            hard_stop();
            return;
        }

        ts->del_cnt += 1;
        if (keystats)
            keystats[keynum].ks_dels++;
    }
}

bool
do_error(
    struct tstate *ts,
    uint64_t keynum,
    char *key_range_name,
    const char *errmsg,
    void *key,
    uint32_t klen,
    void *val,
    uint32_t vlen,
    void *exp,
    uint32_t explen,
    int verify_delete,
    int found)
{
    int errors = atomic_fetch_add(&test.errors, 1) + 1;

    fprintf(
        stderr, "Error #%d: T%03d I%lu: verify %s: %s\n", errors, ts->id, ts->iter_cnt,
        key_range_name, errmsg);

    fprintf(
        stderr, "Error #%d: key#%lu len=%u: %s\n", errors, keynum, klen,
        hexstr(key, klen, ts->msgbuf, sizeof(ts->msgbuf)));

    if (!verify_delete)
        fprintf(
            stderr, "Error #%d: expected %u byte value: %s\n", errors, explen,
            hexstr(exp, explen, ts->msgbuf, sizeof(ts->msgbuf)));
    else if (keystats)
        fprintf(
            stderr, "keynum %lu puts %d upds %d dels %d\n", keynum, keystats[keynum].ks_puts,
            keystats[keynum].ks_upds, keystats[keynum].ks_dels);

    if (found)
        fprintf(
            stderr, "Error #%d: retrieved %u byte value: %s\n", errors, vlen,
            hexstr(val, vlen, ts->msgbuf, sizeof(ts->msgbuf)));

    return opt.max_errors > 0 && errors >= opt.max_errors;
}

/*
 * Cursor verification operates over the entire (prefix) tree,
 * after all the P,U,D phases are complete. The order of these
 * is important, as it describes d1_len, u_len, and p_len.
 *      PPPPPPPPPP put phase
 *      UUUUUPPPPP upd phase
 *      UUUUUPPPDD del-1 phase
 *      DDUUUPPPDD del-2 phase
 * p_len is the total number of ops.
 * u_len is the number of UPD ops.
 * d1_len is the number of DEL ops, and is the same for before and after.
 *
 * At the point cursor verification runs, the number of updates
 * is u_len - d1_len, the number of puts is p_len - u_len - d1_len.
 * Cursors do not show deleted keys, so these must be intuited by
 * the absence of any key numbers within this range.
 *
 * Each P operation encodes the keynum and iteration count within the key.
 * This is recoverable, and can be used to make assertions using the
 * above state transitions and the d1,u,p len values.
 *
 * Note: because a scan finds ALL keys in a kvs, longtest must be run
 * on an empty kvs, else it will find keys from a previous run.
 *
 * MU_REVISIT: this could be corrected by encoding a single byte
 * in the key which is an increasing cyclic integer.  If longtest
 * wrote a single unique key/value pair into a kvs, this would allow
 * multiple, independent longtest runs on the same kvs.  This could
 * also be solved by using this byte as a prefix: this would substantially
 * reduce the scan size to just that of this longtest, with no keys
 * to ignore.
 */

void
do_cursor_verify(struct tstate *ts, uint64_t d1_len, uint64_t u_len, uint64_t p_len)
{
    int err;
    void *key = 0, *val = 0, *exp = 0;
    size_t klen, vlen;
    uint explen;
    void *cur;
    const char *errmsg;
    uint64_t keynum, iter, count;
    uint32_t phase;

    /*
     * keys < key_first or > key_last are found deleted keys
     * keys < key_upd have a value tag U, else value tag P
     */
    uint64_t key_first = ts->key_space_offset + d1_len;
    uint64_t key_last = ts->key_space_offset + p_len - d1_len;
    uint64_t key_upd = ts->key_space_offset + u_len;

    /*
     * phases can alter this key map:
     * if P, then no u or d
     * if PU, then no d
     * if PD, then no d1 offset from first
     * else full range
     *
     * phases MUST run in order: P PU PD PUD
     */

    phase = opt.mphase & PHASE_PUD_MASK;
    if (phase <= PHASE_DEL_P)
        key_first -= d1_len;
    if (phase <= PHASE_PUT_PU)
        key_last += d1_len;
    if (phase <= PHASE_PUT_P)
        key_upd = 0;

    if (opt.verbose > 2 || opt.interactive)
        printf("T%03u I%lu: verify with cursor\n", ts->id, ts->iter_cnt);
    interact(0);

    /*
     * In order to allow multiple threads, limit the cursor
     * to the data for this thread, as a prefix.
     */

    err = xkvdb_scan_begin(&test, ts->pfx, ts->pfxlen, &cur);
    if (err) {
        fprintf(
            stderr,
            "T%03d I%lu: Error: verify %s %lu: "
            "cannot begin scan: %d\n",
            ts->id, ts->iter_cnt, "cursor", key_first, err);
        /* allow a scan to fail because all resources changing */
        if (err != EAGAIN)
            hard_stop();
        return;
    }

    count = 0;
    for (;;) {
        uint16_t tid;
        bool vd = false;

        err = xkvdb_scan_read(cur, &key, &klen, &val, &vlen);
        if (err || klen == 0)
            break;

        errmsg = 0;
        ++count;

        /*
         * extract iter and keynum (id) from key
         * keys always have a tag of 0
         * values always have a tag P,U
         */
        /* [MU_REVISIT] this algorithm assumes incorrectly that
         * rsgen_decode() can reverse map keys >= 8 bytes to
         * keynums.  However, it cannot.
         */
        tid = rsgen_decode(key, klen, &keynum, &iter, 0);

        if (tid != ts->id) {
            errmsg = "found key from wrong tid";
        } else if (keynum < key_first || keynum > key_last) {
            errmsg = "found deleted key";
            vd = true;
        } else {
            char tag = keynum < key_upd ? 'U' : 'P';

            exp = ts->exp_val;
            rsgen_str(&test.vgen, ts->id, keynum, iter, tag, exp, &explen);
        }

        if (!errmsg && iter != ts->iter_cnt)
            errmsg = "found key with wrong iter_cnt";
        else if (!errmsg) {
            if (vlen != explen)
                errmsg = "key found, but value length is wrong";
            else if (memcmp(val, exp, explen))
                errmsg = "key found, but value is wrong";
        }

        if (errmsg && !opt.dryrun &&
            do_error(ts, keynum, "cursor", errmsg, key, klen, val, vlen, exp, explen, vd, 1))
        {
            hard_stop();
            break;
        }
    }

    if (err) {
        fprintf(
            stderr,
            "T%03d I%lu: Error: verify %s %lu: "
            "error during scan: %d\n",
            ts->id, ts->iter_cnt, "cursor", keynum, err);
        hard_stop();
    }

    xkvdb_scan_end(cur);

    if (!atomic_read(&test.hard_stop) && count != key_last - key_first)
        fprintf(
            stderr,
            "T%03d I%lu: Error: verify %s %lu: "
            "found %ld keys, expected %ld\n",
            ts->id, ts->iter_cnt, "cursor", key_first, count, p_len - d1_len * 2);
}

void
do_verifies(
    struct tstate *ts,
    uint64_t first_key,
    uint64_t num_keys,
    char *key_range_name,
    char tag)
{
    uint64_t rc;
    void *key, *exp;
    size_t klen, vlen;
    uint gen_klen;
    uint explen;
    bool found;
    bool verify_delete = (tag == 'D');
    uint64_t i, stop_at, keynum;
    const char *errmsg;

    if (opt.verify == 0)
        return;

    if (opt.verify < 100) {
        /* TODO: select a random sample to verify
         * instead of stopping after the first N
         * entries */
        stop_at = (uint64_t)(num_keys * (opt.verify / 100.0));
        if (stop_at < 5)
            stop_at = 5;
        if (stop_at > num_keys)
            stop_at = num_keys;
    } else {
        stop_at = num_keys;
    }

    if (opt.verbose > 2 || opt.interactive)
        printf(
            "T%03u I%lu: verify(%s) %lu of %lu keys\n", ts->id, ts->iter_cnt, key_range_name,
            stop_at, num_keys);
    interact(0);

    for (i = 0; i < stop_at; i++) {

        if (atomic_read(&test.hard_stop))
            break;

        keynum = i + ts->key_space_offset + first_key;

        rsgen_str(&test.kgen, ts->id, keynum, ts->iter_cnt, 0, ts->genkey, &gen_klen);

        klen = gen_klen;
        key = ts->genkey;

        if (opt.verbose > 3)
            printf(
                "T%03d I%lu: hse_kvs_get(%s) "
                "key#%lu len=%lu key=%s\n",
                ts->id, ts->iter_cnt, key_range_name, keynum, (size_t)klen,
                hexstr(ts->genkey, klen, ts->msgbuf, sizeof(ts->msgbuf)));

        found = false;
        rc = xkvs_get(&test, ts->genkey, klen, &found, ts->act_val, &vlen);
        if (rc) {
            fprintf(
                stderr,
                "T%03d I%lu: Error: verify %s %lu: "
                "hse_kvs_get failed\n",
                ts->id, ts->iter_cnt, key_range_name, keynum);
            hard_stop();
            return;
        }

        ts->get_cnt += 1;

        explen = 0;
        exp = 0;
        errmsg = 0;

        if (verify_delete) {
            if (found)
                errmsg = "found deleted key";
        } else {
            rsgen_str(&test.vgen, ts->id, keynum, ts->iter_cnt, tag, ts->exp_val, &explen);
            exp = ts->exp_val;

            if (!found)
                errmsg = "key not found";
            else if (vlen != explen)
                errmsg = "key found, but value length is wrong";
            else if (memcmp(ts->act_val, exp, explen))
                errmsg = "key found, but value is wrong";
        }

        if (errmsg && !opt.dryrun &&
            do_error(
                ts, keynum, key_range_name, errmsg, key, klen, ts->act_val, vlen, exp, explen,
                verify_delete, found))
            hard_stop();
    }
}

void
do_one_iteration(struct tstate *ts)
{
    uint64_t u_len, d1_len, d2_len;

    /* Setup ranges for this iteration as follows:
     *
     *   Put:      PPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP
     *   Update:   UUUUUUUUUUUUUUUU................
     *   Delete_1: ........................DDDDDDDD
     *   Delete_2: DDDDDDDD........................
     *
     * Final state after 4 modify ops:
     *
     *   final:    DDDDDDDDUUUUUUUUPPPPPPPPDDDDDDDD
     *
     * Set U and D1 range sizes to 10% and 5% of total respectively,
     * unless 5% of total is 0, then bump to 50% and 25%.  Might
     * still end up with U and/or D1 range sizes being zero -- if
     * that's true there just aren't enough keys to populate each
     * range.  Set D2 range to be the same size as D1.
     */
    u_len = ts->num_keys / 10;
    d1_len = ts->num_keys / 20;
    if (d1_len == 0) {
        u_len = ts->num_keys / 2;
        d1_len = ts->num_keys / 4;
    }
    d2_len = d1_len;

    /* KVS State:  <empty> */
    if (opt.mphase & PHASE_PUT_P)
        do_puts(ts, 0, ts->num_keys, "P", 'P');

    if (opt.sync == -1 || opt.sync == ts->id) {
        interact("hse_kvdb_sync");
        hse_kvdb_sync(test.kvdb_h, 0);
    }

    /* KVS State:  PPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP */

    if (opt.mphase & PHASE_PUT_PU)
        do_puts(ts, 0, u_len, "PU", 'U');

    /* KVS State:  UUUUUUUUUUUUUUUUPPPPPPPPPPPPPPPP */

    if (opt.mphase & PHASE_DEL_P)
        do_deletes(ts, ts->num_keys - d2_len, d2_len, "PD");

    /* KVS State:  UUUUUUUUUUUUUUUUPPPPPPPPDDDDDDDD */

    if (opt.mphase & PHASE_DEL_PU)
        do_deletes(ts, 0, d1_len, "PUD");

    /* KVS State:  DDDDDDDDUUUUUUUUPPPPPPPPDDDDDDDD */

    if (opt.cursor) {
        do_cursor_verify(ts, d1_len, u_len, ts->num_keys);
        goto del_rem;
    }

    if (opt.mphase & PHASE_VER_P)
        do_verifies(ts, u_len, ts->num_keys - d2_len - u_len, "P", 'P');
    if (opt.mphase & PHASE_VER_PU)
        do_verifies(ts, d1_len, u_len - d1_len, "PU", 'U');
    if (opt.mphase & PHASE_VER_PUD)
        do_verifies(ts, 0, d1_len, "PUD", 'D');
    if (opt.mphase & PHASE_VER_PD)
        do_verifies(ts, ts->num_keys - d2_len, d2_len, "PD", 'D');

del_rem:

    /* Delete remaining keys */
    if (opt.mphase & PHASE_DEL_REM)
        do_deletes(ts, d1_len, ts->num_keys - d1_len - d2_len, "REM");

    /* Verify remaining keys deleted */
    if (opt.mphase & PHASE_VER_REM) {
        if (opt.cursor)
            do_cursor_verify(ts, 0, 0, 0);
        else
            do_verifies(ts, d1_len, ts->num_keys - d1_len - d2_len, "REM", 'D');
    }
}

bool
test_done(void)
{
    return atomic_read(&test.test_complete) || atomic_read(&test.hard_stop);
}

void *
seqnum_pfx_thread(void *rock)
{
    struct tstate *ts = (struct tstate *)rock;
    uint64_t rc;

    bar_sync();

    while (!test_done()) {
        usleep(10 * 1000);
        rc = xkvs_prefix_delete(&test, test.seqnum_pfx_key, opt.seqnum_pfx);
        if (rc) {
            fprintf(stderr, "%s: hse_kvs_prefix_delete failed\n", progname);
            hard_stop();
        }
    }

    ts->running = false;
    return NULL;
}

void *
test_main(void *rock)
{
    int err = 0;
    struct tstate *ts = (struct tstate *)rock;

    if (ts->num_keys == 0) {
        fprintf(
            stderr,
            "Error: thread %d has no keys!\n"
            "Try one or more of the following:\n"
            "  1. more keys\n"
            "  2. fewer threads\n"
            "  3. smaller value with --poly or --exp\n",
            ts->id);
        err = 1;
    }

    if (err)
        hard_stop();

    bar_sync();

    do {
        if (opt.mthread == -1 || opt.mthread == (int)ts->id) {
            do_one_iteration(ts);
            ts->iter_cnt += 1;
        }

        if (opt.once || (ts->num_iters > 0 && ts->iter_cnt == ts->num_iters))
            atomic_set(&test.test_complete, 1);

    } while (!test_done());

    ts->running = false;
    return NULL;
}

int
test_init(void)
{
    double *dist = 0;
    uint64_t *keys = 0;
    uint64_t assigned_keys = 0;
    uint32_t i;
    uint32_t max_iter = 0;

    memset(&test, 0, sizeof(test));

    dist = (double *)malloc(sizeof(double) * opt.test_threads);
    keys = (uint64_t *)malloc(sizeof(uint64_t) * opt.test_threads);
    if (!dist || !keys) {
        fprintf(stderr, "Error: out of memory\n");
        goto error;
    }

    test.running_threads = opt.test_threads;
    if (opt.seqnum_pfx)
        ++test.running_threads;
    if (pthread_barrier_init(&test.bar_sync, NULL, (unsigned)test.running_threads + 1)) {
        perror("Error: pthread_barrier_init");
        goto error;
    }

    for (i = 0; i < test.running_threads; i++)
        test.ts[i].id = i;

    /* Determine how many keys each thread will manage. */
    if (opt.distr_is_exp) {
        if (exp_dist(opt.distr_param, opt.test_threads, dist))
            goto error;
    } else {
        if (poly_dist(opt.distr_param, opt.test_threads, dist))
            goto error;
    }

    if (normalize_dist(dist, opt.test_threads, opt.keys, keys))
        goto error;

    /* flip the distribution so thread 0 has the most keys */
    for (i = 0; i < opt.test_threads; i++) {
        struct tstate *ts = test.ts + i;
        uint64_t nkeys = keys[opt.test_threads - 1 - i];

        ts->key_space_offset = assigned_keys;
        ts->num_keys = nkeys;
        assigned_keys += nkeys;
    }
    if (opt.keys != assigned_keys) {
        fprintf(
            stderr,
            "Error: opt.keys (%lu) != assigned_keys (%lu)\n"
            "Bad logic in longtest!\n",
            opt.keys, assigned_keys);
        goto error;
    }

    /*
     * There are multiple iterations for every use case except
     *	-i1 --once or -i1 -t1 or -s1 -t100 -c1 (silly);
     * every other case has at least 2 iterations possible.
     *
     * With a min iteration elapsed time of 100ms (of interest),
     * there cannot be more than 86400 / .1 = 864,000 iterations / day.
     * Assuming constant iteration elapsed time, the thread with
     * least keys will run max / min + 1 times for each max thread.
     */

    max_iter = opt.duration ? opt.duration * 10 : opt.num_iters;
    max_iter *= 1 + keys[0] / keys[opt.test_threads - 1];

    /* initialize key and value generators */
    if (rsgen_init(
            &test.kgen, opt.keys, max_iter, false, opt.kmin, opt.kmax, opt.test_threads, opt.seed))
    {
        fprintf(stderr, "Error: Invalid key length: %s\n", test.kgen.rs_errmsg);
        goto error;
    }
    if (rsgen_init(
            &test.vgen, opt.keys, max_iter, true, opt.vmin, opt.vmax, 0, opt.seed ^ 0x01010101))
    {
        fprintf(stderr, "Error: Invalid value length: %s\n", test.vgen.rs_errmsg);
        goto error;
    }

    /* if there are to be thread ids as prefixes, init them here */
    if (opt.cursor && opt.test_threads > 1) {
        for (i = 0; i < opt.test_threads; i++) {
            test.ts[i].pfx = test.ts[i].pfxbuf;
            test.ts[i].pfxlen = test.kgen.rs_prefix_tid_bytes;
            rsgen_set_tid(&test.kgen, test.ts[i].pfxbuf, i);
        }
    }

    /* set thread 0's iter count (or mthread if running a single thread) */
    if (opt.num_iters) {
        if (opt.mthread == -1)
            test.ts[0].num_iters = opt.num_iters;
        else
            test.ts[opt.mthread].num_iters = opt.num_iters;
    }

    free(dist);
    free(keys);
    return 0;

error:
    free(dist);
    free(keys);
    rsgen_fini(&test.kgen);
    if (opt.vmax)
        rsgen_fini(&test.vgen);
    return -1;
}

void
test_fini(void)
{
    rsgen_fini(&test.kgen);
    if (opt.vmax)
        rsgen_fini(&test.vgen);

    free(keystats);
    keystats = NULL;
}

void
test_stats_header(void)
{
    printf(
        "stats: %8s %8s %8s %8s %8s %8s %8s\n", "seconds", "put_ins", "put_upd", "put_del", "puts",
        "gets", "ops");
}

void
get_stats(
    struct test_stats curr[static MAX_THREADS],
    struct test_stats *tot,
    uint64_t *elapsed_time)
{
    uint32_t i;

    /* get "snapshot" of thread counters */
    for (i = 0; i < opt.test_threads; i++) {
        curr[i].puts = test.ts[i].put_cnt;
        curr[i].upds = test.ts[i].upd_cnt;
        curr[i].dels = test.ts[i].del_cnt;
        curr[i].gets = test.ts[i].get_cnt;
        curr[i].iters = test.ts[i].iter_cnt;
        curr[i].ops = (curr[i].puts + curr[i].upds + curr[i].gets + curr[i].dels);
    }

    *elapsed_time = gtod_usec() - test.start_time;

    /* compute totals */
    memset(tot, 0, sizeof(*tot));
    for (i = 0; i < opt.test_threads; i++) {
        tot->puts += curr[i].puts;
        tot->upds += curr[i].upds;
        tot->dels += curr[i].dels;
        tot->gets += curr[i].gets;
        tot->iters += curr[i].iters;
        tot->ops += curr[i].ops;
    }
}

void
test_stats(void)
{
    uint64_t elapsed_time;

    struct test_stats curr[MAX_THREADS];
    struct test_stats tot;
    struct test_stats delta;

    get_stats(curr, &tot, &elapsed_time);

    /* compute delta */
    delta.puts = tot.puts - test.tot.puts;
    delta.upds = tot.upds - test.tot.upds;
    delta.dels = tot.dels - test.tot.dels;
    delta.gets = tot.gets - test.tot.gets;
    delta.iters = tot.iters - test.tot.iters;
    delta.ops = tot.ops - test.tot.ops;

    /* update totals for next time */
    test.tot.puts = tot.puts;
    test.tot.upds = tot.upds;
    test.tot.dels = tot.dels;
    test.tot.gets = tot.gets;
    test.tot.iters = tot.iters;
    test.tot.ops = tot.ops;

    /* report */
    if (test.stat_rows_no_hdr > 20)
        test.stat_rows_no_hdr = 0;

    if (test.stat_rows_no_hdr == 0)
        test_stats_header();

    test.stat_rows_no_hdr++;

    printf(
        "stats: %8.3f %8lu %8lu %8lu %8lu %8lu %8lu\n", elapsed_time * 1e-6, delta.puts, delta.upds,
        delta.dels, delta.puts + delta.upds + delta.dels, delta.gets, delta.ops);
}

void
test_summary(void)
{
    int i;
    uint64_t elapsed_time;

    struct test_stats curr[MAX_THREADS];
    struct test_stats tot;
    struct test_stats *s;

    get_stats(curr, &tot, &elapsed_time);

    printf("Op counts by thread:\n");
    printf(
        "summary: %8s %8s %8s %8s %8s %8s %8s %8s\n", "thread", "iters", "put_ins", "put_upd",
        "put_del", "puts", "gets", "ops");

    for (i = 0; i < opt.test_threads; i++) {
        s = curr + i;
        printf(
            "summary: %8d %8lu %8lu %8lu %8lu %8lu %8lu %8lu\n", i, s->iters, s->puts, s->upds,
            s->dels, s->puts + s->upds + s->dels, s->gets, s->puts + s->upds + s->dels + s->gets);
    }

    s = &tot;
    printf(
        "summary: %8s %8lu %8lu %8lu %8lu %8lu %8lu %8lu\n", "Total", s->iters, s->puts, s->upds,
        s->dels, s->puts + s->upds + s->dels, s->gets, s->puts + s->upds + s->dels + s->gets);
}

bool
test_threads_running(void)
{
    int i;

    for (i = 0; i < opt.test_threads; i++)
        if (test.ts[i].running)
            return true;
    return false;
}

int
test_run(void)
{
    int errors;
    uint64_t rc;
    uint64_t elapsed_time;
    uint64_t last_report = 0;
    unsigned int idxv;

    rc = xkvdb_kvs_open(&test, &idxv, opt.mpool, opt.kvs);
    if (rc) {
        fprintf(stderr, "%s: kvs_open(%s,%s) failed\n", progname, opt.mpool, opt.kvs);
        return -1;
    }

    start_threads();
    bar_sync();
    test.start_time = gtod_usec();

    while (test_threads_running()) {

        elapsed_time = gtod_usec() - test.start_time;
        if (opt.duration && elapsed_time > opt.duration * 1000 * 1000)
            atomic_set(&test.test_complete, 1);

        if (opt.stats) {
            if (!last_report)
                last_report = elapsed_time;
            else if (elapsed_time - last_report > 1000 * 1000) {
                last_report = elapsed_time;
                test_stats();
            }
        }

        usleep(10 * 1000);
    }

    join_threads();

    if (opt.stats)
        test_stats();

    if (opt.verbose)
        test_summary();

    rc = xkvdb_kvs_close(&test);
    if (rc) {
        fprintf(stderr, "%s: kvs_close %s/%s failed\n", progname, opt.mpool, opt.kvs);
    }

    errors = atomic_read(&test.errors);
    if (errors)
        fprintf(stderr, "Error: %d verify errors occurred\n", errors);

    return rc;
}

int
main(int argc, char **argv)
{
    int err = 0;
    int i;

    progname_set(argv[0]);

    setvbuf(stdout, NULL, _IOLBF, 0);

    opts_set_default(&opt);
    opts_parse(argc, argv, &opt);

    if (opt.version || opt.help)
        goto done;

    err = pg_create(&pg, PG_HSE_GLOBAL, PG_KVDB_OPEN, PG_KVS_OPEN, NULL);
    if (err) {
        fprintf(stderr, "pg_create: rc %d\n", err);
        goto done;
    }

    if (argc - optind < 2)
        syntax("Missing required parameters");

    opt.mpool = argv[optind++];
    opt.kvs = argv[optind++];

    if (opt.interactive && opt.test_threads != 1)
        syntax("Interactive mode only supported with one thread");

    /* iters and duration */
    if (opt.num_iters && opt.duration)
        syntax("Pick -i or -s but not both");
    if (!opt.num_iters && !opt.duration)
        opt.num_iters = 1;

    /* threads */
    if (opt.test_threads == 0 || opt.test_threads > MAX_TEST_THREADS)
        syntax("Invalid thread count: %u (valid range: 0..%u)", opt.test_threads, MAX_TEST_THREADS);
    if (opt.mthread != -1)
        if (opt.mthread < 0 || opt.mthread >= opt.test_threads)
            syntax("Invalid mthread value: %d\n", opt.mthread);

    /* key len */
    if (opt.kmax > MAX_KMAX)
        syntax(
            "Invalid key len range: %u..%u\n"
            "Max key len is %u\n",
            opt.kmin, opt.kmax, MAX_KMAX);
    if (opt.kmin > opt.kmax)
        opt.kmin = opt.kmax;

    if (opt.cursor) {
        /* rsgen cannot reliably decode keys longer than 8 bytes */
        if (opt.kmax > 8)
            syntax("cursor verify requires kmin <= kmax <= 8");

        if (opt.verbose >= 1) {
            size_t sz = sizeof(*keystats) * opt.keys;

            keystats = malloc(sz);
            if (keystats)
                memset(keystats, 0, sz);
            else
                printf("WARNING: per-key stats unavailable\n");
        }
    }

    /* value len */
    if (opt.vmax > MAX_VMAX)
        syntax(
            "Invalid value len range: %u..%u\n"
            "Max vlen is %u\n",
            opt.vmin, opt.vmax, MAX_VMAX);
    if (opt.vmin > opt.vmax)
        opt.vmin = opt.vmax;

    /* mpool and kvs */
    if (!opt.mpool || !opt.kvs)
        syntax("Must indicate mpool and kvs");

    err = pg_parse_argv(pg, argc, argv, &optind);
    switch (err) {
    case 0:
        if (optind < argc) {
            fprintf(stderr, "unknown parameter: %s", argv[optind]);
            exit(EX_USAGE);
        }
        break;
    case EINVAL:
        fprintf(
            stderr, "missing group name (e.g. %s) before parameter %s\n", PG_KVDB_OPEN,
            argv[optind]);
        exit(EX_USAGE);
        break;
    default:
        fprintf(stderr, "error processing parameter %s\n", argv[optind]);
        exit(EX_OSERR);
        break;
    }

    err = err ?: svec_append_pg(&hse_gparm, pg, PG_HSE_GLOBAL, NULL);
    err = err ?: svec_append_pg(&db_oparms, pg, PG_KVDB_OPEN, NULL);
    err = err ?: svec_append_pg(&kv_oparms, pg, PG_KVS_OPEN, NULL);
    if (err) {
        fprintf(stderr, "svec_apppend_pg failed: %d\n", err);
        exit(EX_OSERR);
    }

    if (!opt.seed)
        opt.seed = (uint32_t)gtod_usec();

    if (opt.verbose || opt.show)
        opts_show(&opt);

    err = test_init();
    if (err)
        goto done;

    /* show each thread's key count */
    if (opt.verbose || opt.show) {
        for (i = 0; i < opt.test_threads; i++)
            printf("T%03d: %8lu keys\n", i, test.ts[i].num_keys);
        if (opt.show)
            goto done;
    }

    err = hse_init(opt.config, hse_gparm.strc, hse_gparm.strv);
    if (err) {
        fprintf(stderr, "failed to initialize kvdb\n");
        goto done;
    }

    err = test_run();
    if (err)
        goto done;

    /*
     * Hack: if #keys == 0, wait 100ms so we can use longtest with no
     * keys to do offline compaction.  Waiting a bit here gives
     * queued compaction tasks in CN a chance to get going.  Once
     * they get going, the unmount will wait for completion.
     */
    if (opt.keys == 0)
        usleep(100 * 1000);

done:
    test_fini();

    pg_destroy(pg);
    svec_reset(&hse_gparm);
    svec_reset(&db_oparms);
    svec_reset(&kv_oparms);

    hse_fini();

    return err;
}
